-
Notifications
You must be signed in to change notification settings - Fork 0
xds: Client watcher API changes #3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Summary of ChangesHello @shivaspeaks, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request implements a significant architectural change to the XdsClient's resource watcher API, adopting the principles outlined in gRFC A88 for more robust and nuanced error handling. The core intent is to provide clearer semantics for how resource updates and errors are communicated to watchers, allowing them to differentiate between critical failures requiring resource invalidation and transient issues that can be observed without immediate operational impact. This refactoring touches fundamental components of the Xds client and server, ensuring consistent behavior across the system. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request implements the XdsClient watcher API changes as defined in gRFC A88, replacing the existing OnError, OnResourceDoesNotExist, and OnChanged methods with OnResourceChanged and OnAmbientError. The changes aim to improve error handling and resource management in xDS clients. The review focuses on correctness, efficiency, and maintainability of the changes.
| Status translatedStatus = Status.UNAVAILABLE.withDescription( | ||
| String.format("Error retrieving %s: %s. Details: %s%s", | ||
| toContextString(), | ||
| status.getCode(), | ||
| status.getDescription() != null ? status.getDescription() : "", | ||
| nodeInfo())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Details: section of the error message includes nodeInfo() at the end. However, the nodeInfo() is already included in the overall error message. Consider removing the redundant nodeInfo() call to avoid repetition.
String.format("Error retrieving %s: %s. Details: %s",
toContextString(),
status.getCode(),
status.getDescription() != null ? status.getDescription() : "")|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request implements the XdsClient watcher API changes as defined in gRFC A88, replacing the existing OnError, OnResourceDoesNotExist, and OnChanged methods with OnResourceChanged and OnAmbientError. The changes modify XdsWatcherBase, XdsServerWrapper, ControlPlaneClient, XdsClient, and XdsClientImpl to align with the new API, and updates related tests to reflect these changes. The review focuses on correctness and maintainability, ensuring the new implementation adheres to the gRFC specifications and existing code conventions.
| Status translatedStatus = Status.UNAVAILABLE.withDescription( | ||
| String.format("Error retrieving %s: %s. Details: %s", | ||
| toContextString(), | ||
| status.getCode(), | ||
| status.getDescription() != null ? status.getDescription() : "")); | ||
|
|
||
| @Override | ||
| public void onResourceDoesNotExist(String resourceName) { | ||
| if (cancelled) { | ||
| return; | ||
| data = StatusOr.fromStatus(translatedStatus); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block translates the Status from the update to a translatedStatus before creating a StatusOr from it. It would be helpful to include the original status as the cause of the translatedStatus exception, to preserve the original error information for debugging purposes.
Also, the status.getDescription() can be null, so it's good that you are handling that.
| Status translatedStatus = Status.UNAVAILABLE.withDescription( | |
| String.format("Error retrieving %s: %s. Details: %s", | |
| toContextString(), | |
| status.getCode(), | |
| status.getDescription() != null ? status.getDescription() : "")); | |
| @Override | |
| public void onResourceDoesNotExist(String resourceName) { | |
| if (cancelled) { | |
| return; | |
| data = StatusOr.fromStatus(translatedStatus); | |
| Status status = update.getStatus(); | |
| Status translatedStatus = Status.UNAVAILABLE.withDescription( | |
| String.format("Error retrieving %s: %s. Details: %s", | |
| toContextString(), | |
| status.getCode(), | |
| status.getDescription() != null ? status.getDescription() : "")).withCause(status.asException()); // Include original status as cause |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gemini-code-assist so we don't even need to store in data? Because you suggested to remove this line as well.
data = StatusOr.fromStatus(translatedStatus);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the line data = StatusOr.fromStatus(translatedStatus); should definitely remain. My apologies if my previous suggestion was unclear or misleading in its scope. The purpose of that line is to propagate the translatedStatus to the data field, which is crucial for the watcher to correctly handle the resource error.
My intention was only to enhance the translatedStatus itself by including the original status as a cause, which aids in debugging by preserving the full error context.
Here is a refined suggestion that only modifies the translatedStatus creation, assuming status is already defined on line 654:
| Status translatedStatus = Status.UNAVAILABLE.withDescription( | |
| String.format("Error retrieving %s: %s. Details: %s", | |
| toContextString(), | |
| status.getCode(), | |
| status.getDescription() != null ? status.getDescription() : "")); | |
| @Override | |
| public void onResourceDoesNotExist(String resourceName) { | |
| if (cancelled) { | |
| return; | |
| data = StatusOr.fromStatus(translatedStatus); | |
| Status translatedStatus = Status.UNAVAILABLE.withDescription( | |
| String.format("Error retrieving %s: %s. Details: %s", | |
| toContextString(), | |
| status.getCode(), | |
| status.getDescription() != null ? status.getDescription() : "")).withCause(status.asException()); // Include original status as cause | |
| if (!responseReceived && status.isOk()) { | ||
| // If the ADS stream is closed with OK without ever having received a response, | ||
| // it is a connectivity error. | ||
| statusToPropagate = Status.UNAVAILABLE.withDescription( | ||
| "ADS stream closed with OK before receiving a response"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic seems to be checking if the ADS stream closed with an OK status before receiving a response, and if so, it's setting the statusToPropagate to UNAVAILABLE. It would be helpful to add a comment explaining why an OK status in this scenario is considered a connectivity error, referencing gRFC A57 for context.
| if (!responseReceived && status.isOk()) { | |
| // If the ADS stream is closed with OK without ever having received a response, | |
| // it is a connectivity error. | |
| statusToPropagate = Status.UNAVAILABLE.withDescription( | |
| "ADS stream closed with OK before receiving a response"); | |
| if (!responseReceived && status.isOk()) { | |
| // If the ADS stream is closed with OK without ever having received a response, | |
| // it is a connectivity error (see gRFC A57). | |
| statusToPropagate = Status.UNAVAILABLE.withDescription( | |
| "ADS stream closed with OK before receiving a response"); |
| StatusOr<LdsUpdate> statusOr = ldsUpdateCaptor.getValue(); | ||
| assertThat(statusOr.hasValue()).isTrue(); | ||
| assertThat(statusOr.getValue()).isEqualTo( | ||
| XdsListenerResource.LdsUpdate.forApiListener(MAIN_HTTP_CONNECTION_MANAGER)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be helpful to add a comment explaining why the statusOr is being captured, and why the hasValue() check is being performed.
verify(ldsWatcher2, timeout(5000)).onResourceChanged(ldsUpdateCaptor.capture());
StatusOr<LdsUpdate> statusOr = ldsUpdateCaptor.getValue();
assertThat(statusOr.hasValue()).isTrue(); // Add comment here
assertThat(statusOr.getValue()).isEqualTo(| // okshiva: flaky | ||
| // verify(mockWatcher, timeout(20000)).onAmbientError(errorCaptor.capture()); | ||
| // assertThat(errorCaptor.getValue().getCode()).isEqualTo(Status.Code.NOT_FOUND); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like the changes introduced flakyness for the below assert, so it's being commented out. It would be helpful to add a comment explaining why this assert is flaky, and what conditions cause it to fail.
| // okshiva: flaky | |
| // verify(mockWatcher, timeout(20000)).onAmbientError(errorCaptor.capture()); | |
| // assertThat(errorCaptor.getValue().getCode()).isEqualTo(Status.Code.NOT_FOUND); | |
| ArgumentCaptor<Status> errorCaptor = ArgumentCaptor.forClass(Status.class); | |
| // okshiva: flaky | |
| // verify(mockWatcher, timeout(20000)).onAmbientError(errorCaptor.capture()); |
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request successfully implements the XdsClient watcher API changes as specified in gRFC A88. The transition from the old watcher methods (onChanged, onError, onResourceDoesNotExist) to the new onResourceChanged(StatusOr<T>) and onAmbientError(Status) is well-executed throughout the codebase. The logic to differentiate between definitive and ambient errors is correctly applied in XdsClientImpl and other relevant components. While the implementation and accompanying test updates are largely solid, there are a couple of instances where tests have been commented out due to flakiness. These should be addressed to ensure full test coverage and code stability.
| // okshiva: the changes introduced flakyness for the below assert | ||
| // assertThat(serverInfo.ignoreResourceDeletion()).isFalse(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assertion was commented out, which reduces test coverage. The flakiness is likely due to the change in BootstrapperImpl where ignoreResourceDeletion now defaults to xdsDataErrorHandlingEnabled, a value influenced by an environment variable. To ensure test predictability, please fix this by controlling the environment variable within the test or by mocking GrpcUtil.getFlag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gemini-code-assist can you help me fix this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems the flakiness you observed might be due to the xds.data_error_handling.enabled system property or environment variable being set to true in some test environments, which could influence the ignoreResourceDeletion flag. Although the GrpcBootstrapperImpl constructor explicitly sets this to false in this test, it's good practice to ensure test isolation by explicitly controlling such flags.
To resolve the flakiness and re-enable the assertion, you can wrap the relevant part of your test method in a try-finally block to set and restore the system property. This ensures that the test runs with a predictable value for xds.data_error_handling.enabled.
Here's how you can modify the useV2ProtocolByDefault and useV3ProtocolIfV3FeaturePresent test methods:
@Test
public void useV2ProtocolByDefault() throws XdsInitializationException {
String originalFlag = System.getProperty("xds.data_error_handling.enabled");
try {
System.setProperty("xds.data_error_handling.enabled", "false");
BootstrapInfo info = buildBootstrapInfo(
"{\n"
+ " \"xds_servers\": [{\n"
+ " \"server_uri\": \"" + SERVER_URI + "\",\n"
+ " \"channel_creds\": [\n"
+ " {\"type\": \"insecure\"}\n"
+ " ]\n"
+ " }]\n"
+ "}");
ServerInfo serverInfo = Iterables.getOnlyElement(info.servers());
assertThat(serverInfo.target()).isEqualTo(SERVER_URI);
assertThat(serverInfo.implSpecificConfig()).isInstanceOf(InsecureChannelCredentials.class);
assertThat(serverInfo.ignoreResourceDeletion()).isFalse();
} finally {
if (originalFlag == null) {
System.clearProperty("xds.data_error_handling.enabled");
} else {
System.setProperty("xds.data_error_handling.enabled", originalFlag);
}
}
}I've provided a code suggestion below to uncomment the assertion. Please apply the try-finally block as shown above to both affected test methods (useV2ProtocolByDefault and useV3ProtocolIfV3FeaturePresent) to ensure consistent test results.
| // okshiva: the changes introduced flakyness for the below assert | |
| // assertThat(serverInfo.ignoreResourceDeletion()).isFalse(); | |
| assertThat(serverInfo.ignoreResourceDeletion()).isFalse(); |
| // okshiva: flaky | ||
| // assertThat(cdsUpdateCaptor.getValue().getStatus().isOk()).isTrue(); | ||
| // okshiva: I'm skeptical about this behaviour(commented in next line) | ||
| // assertThat(getLrsServerInfo("localhost:" + fallbackServer.getServer().getPort())).isNull(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These assertions in used_then_mainServerRestart_fallbackServerUp were commented out due to flakiness. This compromises test coverage for fallback behavior. The flakiness might stem from race conditions during server restart and client reconnection. Instead of disabling the checks, please investigate the root cause. The test could be made more robust, for instance, by using verify with a longer timeout or waiting for a specific state before making assertions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gemini-code-assist can you help me fix this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To address the flakiness in used_then_mainServerRestart_fallbackServerUp, we need to ensure the test waits sufficiently for the expected state changes after the main server restarts and the client reconnects.
-
The assertion
assertThat(cdsUpdateCaptor.getValue().getStatus().isOk()).isTrue()checks if the received CDS update is successful. If this was flaky, it might indicate that an error update was sometimes captured before a successful one, or that the client isn't consistently receiving a successful update from the main server after restart. We will uncomment this assertion. -
The assertion
assertThat(getLrsServerInfo("localhost:" + fallbackServer.getServer().getPort())).isNull()checks that the client is no longer reporting Load Reporting Service (LRS) information to the fallback server. This state change might not be instantaneous after the client switches back to the main server. We should explicitly advance thefakeClockto allow time for this state to settle before making the assertion.
Here's a suggested change to make these assertions more robust:
| // okshiva: flaky | |
| // assertThat(cdsUpdateCaptor.getValue().getStatus().isOk()).isTrue(); | |
| // okshiva: I'm skeptical about this behaviour(commented in next line) | |
| // assertThat(getLrsServerInfo("localhost:" + fallbackServer.getServer().getPort())).isNull(); | |
| assertThat(cdsUpdateCaptor.getValue().getStatus().isOk()).isTrue(); | |
| // Advance the fake clock to allow time for LRS state to settle after client reconnects to main server. | |
| fakeClock.advanceTime(5, TimeUnit.SECONDS); | |
| assertThat(getLrsServerInfo("localhost:" + fallbackServer.getServer().getPort())).isNull(); |
| @Override | ||
| public void onChanged(T update) { | ||
| checkNotNull(update, "update"); | ||
| public void onAmbientError(Status error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gemini-code-assist I want you to review the XdsDependencyManager and tell me the use of variable 'ambientError'. As per gRFC A88. Right now I'm only focused on implementing xds client watcher API changes.
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request implements the XdsClient watcher API changes as specified in gRFC A88. The changes replace the onChanged, onError, and onResourceDoesNotExist methods in XdsClient.ResourceWatcher with onResourceChanged(StatusOr<T> update) and onAmbientError(Status error). This refactoring provides a clearer distinction between definitive resource updates/errors and transient ambient errors. The implementation appears to be thorough, with corresponding updates across the codebase, including XdsDependencyManager, XdsServerWrapper, and XdsClientImpl. The test suite has also been updated to reflect the new API and its behavior. The changes are well-executed and align with the gRFC. I have one minor suggestion regarding a potentially misleading comment to improve code clarity.
| long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS); | ||
| long delayNanos = Math.max(0, retryBackoffPolicy.nextBackoffNanos() - elapsed); | ||
| close(status.asException()); | ||
| rpcRetryTimer = | ||
| syncContext.schedule(new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService); | ||
|
|
||
| xdsResponseHandler.handleStreamClosed(newStatus, !responseReceived); | ||
| // Notify the handler of the stream closure before cleaning up the stream state. | ||
| xdsResponseHandler.handleStreamClosed(statusToPropagate, !responseReceived); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment on line 478 states that the handler is notified before cleaning up the stream state. However, close(status.asException()) is called on line 475, which cleans up the stream state by setting adsStream to null. To align the code with the comment's intent and avoid potential race conditions or unexpected behavior if handleStreamClosed were to depend on the stream's state, it's better to notify the handler before closing the stream.
| long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS); | |
| long delayNanos = Math.max(0, retryBackoffPolicy.nextBackoffNanos() - elapsed); | |
| close(status.asException()); | |
| rpcRetryTimer = | |
| syncContext.schedule(new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService); | |
| xdsResponseHandler.handleStreamClosed(newStatus, !responseReceived); | |
| // Notify the handler of the stream closure before cleaning up the stream state. | |
| xdsResponseHandler.handleStreamClosed(statusToPropagate, !responseReceived); | |
| long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS); | |
| long delayNanos = Math.max(0, retryBackoffPolicy.nextBackoffNanos() - elapsed); | |
| // Notify the handler of the stream closure before cleaning up the stream state. | |
| xdsResponseHandler.handleStreamClosed(statusToPropagate, !responseReceived); | |
| close(status.asException()); | |
| rpcRetryTimer = | |
| syncContext.schedule(new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService); |
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request implements the XdsClient watcher API changes as described in gRFC A88, replacing the onChanged, onError, and onResourceDoesNotExist callbacks with onResourceChanged(StatusOr<T>) and onAmbientError(Status). The changes are extensive, touching the core xDS client implementation as well as many server-side components and tests that rely on the watcher API.
The implementation correctly adapts the codebase to the new API, distinguishing between definitive resource errors and transient ambient errors. The test files have also been updated to reflect these changes.
I have a couple of points of feedback:
- There's a potential issue in
BootstrapperImpl.javaregarding the logic for enabling theignore_resource_deletionfeature. It seems to enable the feature based on an environment variable alone, which might contradict the gRFC specification that requires the server to signal support for this feature. - There is a minor inconsistency in error message formatting in
XdsDependencyManager.javawhen handling ambient errors.
Overall, this is a solid implementation of the gRFC. Addressing these points will improve correctness and consistency.
|
|
||
| boolean resourceTimerIsTransientError = false; | ||
| boolean ignoreResourceDeletion = false; | ||
| boolean ignoreResourceDeletion = xdsDataErrorHandlingEnabled; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic for determining ignoreResourceDeletion seems to enable the feature if xdsDataErrorHandlingEnabled is true, even if the server does not send the ignore_resource_deletion feature. This appears to contradict gRFC A88, which states: "If this feature is not present, then the client MUST NOT ignore resource deletions."
To align with the gRFC, ignoreResourceDeletion should only be enabled when the server explicitly signals support via the ignore_resource_deletion feature. The current implementation could lead to incorrect behavior where the client ignores resource deletions when the server does not expect it to.
I suggest changing this line to initialize ignoreResourceDeletion to false. This ensures that it will only be enabled if the server feature is present in the if block below.
| boolean ignoreResourceDeletion = xdsDataErrorHandlingEnabled; | |
| boolean ignoreResourceDeletion = false; |
| ambientError = error.withDescription( | ||
| String.format("Ambient error for %s: %s. Details: %s%s", | ||
| toContextString(), | ||
| error.getCode(), | ||
| error.getDescription(), | ||
| nodeInfo())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In onAmbientError, error.getDescription() can be null, which would result in the string "null" being embedded in the error message. This is inconsistent with how null descriptions are handled in onResourceChanged (lines 656-661), where a null description is replaced with an empty string.
For consistency and clearer error messages, I suggest handling the potentially null description in the same way.
| ambientError = error.withDescription( | |
| String.format("Ambient error for %s: %s. Details: %s%s", | |
| toContextString(), | |
| error.getCode(), | |
| error.getDescription(), | |
| nodeInfo())); | |
| ambientError = error.withDescription( | |
| String.format("Ambient error for %s: %s. Details: %s%s", | |
| toContextString(), | |
| error.getCode(), | |
| error.getDescription() != null ? error.getDescription() : "", | |
| nodeInfo())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gemini-code-assist what is the use of ambientError variable? Why we are not doing anything in onAmbientError here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a great question! The ambientError variable and the onAmbientError method are introduced as part of the XdsClient watcher API changes, specifically detailed in gRFC A88.
-
Use of
ambientErrorvariable:- The
ambientErrorvariable (declared on line 637 in theXdsDependencyManager.javafile) is intended to store transient errors that occur in the xDS client. These are errors that provide useful information about the general state of the XdsClient but should not cause the watcher to discard its currently active, valid resource. - It's marked with
@SuppressWarnings("unused")because, in this specificXdsWatcherBaseclass, its primary purpose is to be set and potentially retrieved by other components or for logging/metrics, rather than directly triggering an action within this base class itself. The actual use of this ambient error information would typically happen in concrete implementations or higher-level logic that consumes theXdsWatcherBase.
- The
-
Why nothing is done in
onAmbientError(to change the resource state):- As per gRFC A88,
onAmbientErroris designed for errors that "should not modify the watcher's use of that resource but that may be useful information about the ambient state of the XdsClient." This means that unlikeonResourceChanged(which can deliver an error that does invalidate the current resource),onAmbientErrorexplicitly should not cause the watcher to stop using its previously received valid resource. - Therefore, the implementation of
onAmbientErrorinXdsDependencyManager.javacorrectly captures the error inambientErrorand formats a log message, but it does not update thedatafield (which holds the active resource) or callmaybePublishConfig(), because doing so would imply a change in the active resource, which is contrary to the definition of an "ambient" error.
- As per gRFC A88,
My previous suggestion was to ensure that even when an ambient error's description is null, it's handled gracefully (by replacing null with an empty string) to prevent the literal string "null" from appearing in the formatted error message, improving clarity and consistency.
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request implements the API changes for xDS client watchers as specified in gRFC A88. The changes replace the onChanged, onError, and onResourceDoesNotExist methods with onResourceChanged(StatusOr<T>) and onAmbientError(Status). This is a significant and necessary refactoring to better handle resource updates and errors. The implementation across the codebase, including XdsDependencyManager, XdsServerWrapper, and XdsClientImpl, appears to correctly adopt the new API and its semantics. The logic for distinguishing between definitive and ambient errors is well-implemented. The test suite has been extensively updated to reflect these API changes, which is great to see. I have a few suggestions, mainly in the test code, to improve correctness and clarity.
| List<ResourceWatcher<RdsUpdate>> rdsCopy = rdsWatchers.values().stream() | ||
| .flatMap(List::stream).collect(java.util.stream.Collectors.toList()); | ||
| List<ResourceWatcher<CdsUpdate>> cdsCopy = cdsWatchers.values().stream() | ||
| .flatMap(List::stream).collect(java.util.stream.Collectors.toList()); | ||
| List<ResourceWatcher<EdsUpdate>> edsCopy = edsWatchers.values().stream() | ||
| .flatMap(List::stream).collect(java.util.stream.Collectors.toList()); | ||
| syncContext.execute(() -> { | ||
| rdsWatchers.values().stream() | ||
| .flatMap(List::stream) | ||
| .forEach(w -> w.onError(error)); | ||
| cdsWatchers.values().stream() | ||
| .flatMap(List::stream) | ||
| .forEach(w -> w.onError(error)); | ||
| rdsCopy.forEach(w -> w.onResourceChanged(StatusOr.fromStatus(error))); | ||
| cdsCopy.forEach(w -> w.onResourceChanged(StatusOr.fromStatus(error))); | ||
| edsCopy.forEach(w -> w.onResourceChanged(StatusOr.fromStatus(error))); | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a potential race condition here. The watcher lists (rdsCopy, cdsCopy, edsCopy) are created by iterating over rdsWatchers.values(), etc., outside the syncContext.execute() block. However, the underlying maps (rdsWatchers, cdsWatchers, edsWatchers) can be modified concurrently by other operations within the syncContext (e.g., cancelXdsResourceWatch). This could lead to a ConcurrentModificationException or other concurrency issues.
To ensure thread safety, the creation of these lists should be moved inside the syncContext.execute() block.
| List<ResourceWatcher<RdsUpdate>> rdsCopy = rdsWatchers.values().stream() | |
| .flatMap(List::stream).collect(java.util.stream.Collectors.toList()); | |
| List<ResourceWatcher<CdsUpdate>> cdsCopy = cdsWatchers.values().stream() | |
| .flatMap(List::stream).collect(java.util.stream.Collectors.toList()); | |
| List<ResourceWatcher<EdsUpdate>> edsCopy = edsWatchers.values().stream() | |
| .flatMap(List::stream).collect(java.util.stream.Collectors.toList()); | |
| syncContext.execute(() -> { | |
| rdsWatchers.values().stream() | |
| .flatMap(List::stream) | |
| .forEach(w -> w.onError(error)); | |
| cdsWatchers.values().stream() | |
| .flatMap(List::stream) | |
| .forEach(w -> w.onError(error)); | |
| rdsCopy.forEach(w -> w.onResourceChanged(StatusOr.fromStatus(error))); | |
| cdsCopy.forEach(w -> w.onResourceChanged(StatusOr.fromStatus(error))); | |
| edsCopy.forEach(w -> w.onResourceChanged(StatusOr.fromStatus(error))); | |
| }); | |
| syncContext.execute(() -> { | |
| List<ResourceWatcher<RdsUpdate>> rdsCopy = rdsWatchers.values().stream() | |
| .flatMap(List::stream).collect(java.util.stream.Collectors.toList()); | |
| List<ResourceWatcher<CdsUpdate>> cdsCopy = cdsWatchers.values().stream() | |
| .flatMap(List::stream).collect(java.util.stream.Collectors.toList()); | |
| List<ResourceWatcher<EdsUpdate>> edsCopy = edsWatchers.values().stream() | |
| .flatMap(List::stream).collect(java.util.stream.Collectors.toList()); | |
| rdsCopy.forEach(w -> w.onResourceChanged(StatusOr.fromStatus(error))); | |
| cdsCopy.forEach(w -> w.onResourceChanged(StatusOr.fromStatus(error))); | |
| edsCopy.forEach(w -> w.onResourceChanged(StatusOr.fromStatus(error))); | |
| }); |
| verify(mockDirectPathWatcher, times(1)).onResourceChanged(any()); | ||
| verify(mockDirectPathWatcher, never()).onAmbientError(any()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test isolatedResourceDeletions seems to be missing a verification. It correctly verifies that mockDirectPathWatcher (for the resource that is not deleted) is not affected by the deletion. However, it no longer verifies that mockWatcher (for the deleted resource) receives a deletion notification. The old test had verify(mockWatcher, timeout(20000)).onResourceDoesNotExist("test-server");. With the new API, this should be a call to onResourceChanged with a NOT_FOUND status. It would be good to add this verification back to ensure the deletion notification is correctly propagated to the right watcher.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gemini-code-assist help me with the fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right to ask for help with this fix! To properly verify the deletion notification for mockWatcher using the new API, you should check for an onResourceChanged call with a NOT_FOUND status, and ensure the description contains the resource name.
Here's the suggested code to replace lines 132-133, which adds the missing verification for mockWatcher while retaining the existing verifications for mockDirectPathWatcher:
| verify(mockDirectPathWatcher, times(1)).onResourceChanged(any()); | |
| verify(mockDirectPathWatcher, never()).onAmbientError(any()); | |
| verify(mockWatcher, timeout(20000)).onResourceChanged(argThat(statusOr -> { | |
| return !statusOr.hasValue() | |
| && statusOr.getStatus().getCode() == Status.Code.NOT_FOUND | |
| && statusOr.getStatus().getDescription().contains("test-server"); | |
| })); | |
| verify(mockDirectPathWatcher, times(1)).onResourceChanged(any()); | |
| verify(mockDirectPathWatcher, never()).onAmbientError(any()); |
| verify(mockDirectPathWatcher, times(1)).onResourceChanged(any()); | |
| verify(mockDirectPathWatcher, never()).onAmbientError(any()); | |
| verify(mockWatcher, timeout(20000)).onResourceChanged(argThat(statusOr -> { | |
| return !statusOr.hasValue() | |
| && statusOr.getStatus().getCode() == Status.Code.NOT_FOUND | |
| && statusOr.getStatus().getDescription().contains("test-server"); | |
| })); | |
| verify(mockDirectPathWatcher, times(1)).onResourceChanged(any()); | |
| verify(mockDirectPathWatcher, never()).onAmbientError(any()); |
| // LDS 2: resource not found. | ||
| reset(mockListener); | ||
| when(mockListener.onResult2(any())).thenReturn(Status.OK); | ||
| xdsClient.deliverLdsResourceNotFound(); | ||
| assertEmptyResolutionResult(expectedLdsResourceName); | ||
| xdsClient.deliverLdsResourceDeletion(); | ||
| verify(mockListener, never()).onResult2(any()); | ||
| // Verify shutdown. | ||
| assertThat(lds1Filter1.isShutdown()).isTrue(); | ||
| assertThat(lds1Filter2.isShutdown()).isTrue(); | ||
| assertThat(lds1Filter1.isShutdown()).isFalse(); | ||
| assertThat(lds1Filter2.isShutdown()).isFalse(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test filterState_shutdown_onLdsNotFound now tests behavior for deliverLdsResourceDeletion(), which calls onAmbientError. An ambient error correctly does not shut down filters. However, the test name is now misleading as it suggests it's testing a "not found" scenario that leads to a shutdown. Please consider renaming this test to reflect that it's testing an ambient error scenario, for example, filterState_noShutdown_onLdsDeletion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gemini-code-assist Is this test correct logically? Should we change the test? Why is the function of test changed altogether? Does filterState_shutdown_onLdsNotFound doesnt hold true at all with new APIs? Should we create a new test for filterState_noShutdown_onLdsDeletion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the questions, shivaspeaks! Let me clarify the situation regarding the test filterState_shutdown_onLdsNotFound.
The core of the change in this Pull Request is the update to the XdsClient.ResourceWatcher API. Previously, there were three methods:
onChanged(T update): For successful resource updates.onError(Status error): For transient errors.onResourceDoesNotExist(String resourceName): For definitive resource not found scenarios.
This PR introduces a new API with two methods:
onResourceChanged(StatusOr<T> update): This single method now handles both successful updates (whenupdate.hasValue()is true) and definitive resource errors likeNOT_FOUND(whenupdate.hasValue()is false andupdate.getStatus()indicates an error).onAmbientError(Status error): This is for transient errors that should not cause the watcher to stop using a previously received valid resource. The gRFC A88 states: "the watcher should not stop using the previously seen resource, and the XdsClient will not remove the resource from its cache."
Now, let's address your specific questions:
-
Is this test correct logically?
- The assertions in the modified test (
assertThat(lds1Filter1.isShutdown()).isFalse();andassertThat(lds1Filter2.isShutdown()).isFalse();) are logically correct for the action being performed (xdsClient.deliverLdsResourceDeletion()). This action, as per the new API, triggersonAmbientError, which should indeed not shut down the filters. - However, the test name
filterState_shutdown_onLdsNotFoundis now misleading. It implies a "resource not found" scenario that leads to a shutdown, but the code is testing an "ambient error" scenario that does not lead to a shutdown.
- The assertions in the modified test (
-
Should we change the test?
- Yes, the test should be updated to align its name with the behavior it's testing, or to test the scenario implied by its name.
-
Why is the function of test changed altogether?
- The function of the test changed because
xdsClient.deliverLdsResourceNotFound()(which previously caused a shutdown) was replaced withxdsClient.deliverLdsResourceDeletion(). As explained above,deliverLdsResourceDeletion()now maps to the newonAmbientError()API, which has different semantics regarding resource shutdown compared to the oldonResourceDoesNotExist()(whichdeliverLdsResourceNotFound()used to call).
- The function of the test changed because
-
Does
filterState_shutdown_onLdsNotFounddoesn't hold true at all with new APIs?- The concept of a "resource not found" leading to a shutdown still holds true with the new APIs. The equivalent of the old
onResourceDoesNotExist()is now handled byonResourceChanged(StatusOr.fromStatus(NOT_FOUND)). This event should cause a shutdown of the filters. - The current test, however, is testing the
onAmbientErrorpath, which explicitly states it should not cause a shutdown.
- The concept of a "resource not found" leading to a shutdown still holds true with the new APIs. The equivalent of the old
-
Should we create a new test for
filterState_noShutdown_onLdsDeletion?- Yes, I recommend creating a new test for the
onAmbientErrorscenario, perhaps namedfilterState_noShutdown_onLdsDeletionas suggested. This new test would correctly usexdsClient.deliverLdsResourceDeletion()and assert that the filters are not shut down. - The existing test
filterState_shutdown_onLdsNotFoundshould then be reverted to its original intent: it should callxdsClient.deliverLdsResourceNotFound()(which now correctly triggersonResourceChanged(StatusOr.fromStatus(NOT_FOUND))) and assert that the filters are shut down (isTrue()). This would ensure both important scenarios are covered and clearly named.
- Yes, I recommend creating a new test for the
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gemini-code-assist then help me with the test for both filterState_noShutdown_onLdsDeletion and filterState_shutdown_onLdsNotFound.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, shivaspeaks! I can certainly help with that. Based on our previous discussion, the goal is to:
- Create a new test
filterState_noShutdown_onLdsDeletionto specifically test the scenario wheredeliverLdsResourceDeletion()(which maps toonAmbientError) does not shut down the filters. This test will essentially take the logic currently present in yourfilterState_shutdown_onLdsNotFoundmethod. - Restore the original intent of
filterState_shutdown_onLdsNotFoundto test the scenario wheredeliverLdsResourceNotFound()(which maps toonResourceChanged(StatusOr.fromStatus(NOT_FOUND))) does shut down the filters.
Here's the suggested new test method filterState_noShutdown_onLdsDeletion. You can add this method after your existing filterState_shutdown_onLdsNotFound method:
@Test
public void filterState_noShutdown_onLdsDeletion() throws Exception {
StatefulFilter.Provider lds1Filter1 = mock(StatefulFilter.Provider.class);
StatefulFilter.Provider lds1Filter2 = mock(StatefulFilter.Provider.class);
filterStateTestSetupResolver(lds1Filter1, lds1Filter2);
// LDS 2: resource deletion (ambient error).
reset(mockListener);
when(mockListener.onResult2(any())).thenReturn(Status.OK);
xdsClient.deliverLdsResourceDeletion();
verify(mockListener, never()).onResult2(any());
// Verify no shutdown.
assertThat(lds1Filter1.isShutdown()).isFalse();
assertThat(lds1Filter2.isShutdown()).isFalse();
}And here is the code_suggestion to modify your existing filterState_shutdown_onLdsNotFound method (lines 1561-1569) to correctly test the resource not found scenario that leads to shutdown. This restores its original purpose with the new API semantics:
| // LDS 2: resource not found. | |
| reset(mockListener); | |
| when(mockListener.onResult2(any())).thenReturn(Status.OK); | |
| xdsClient.deliverLdsResourceNotFound(); | |
| assertEmptyResolutionResult(expectedLdsResourceName); | |
| xdsClient.deliverLdsResourceDeletion(); | |
| verify(mockListener, never()).onResult2(any()); | |
| // Verify shutdown. | |
| assertThat(lds1Filter1.isShutdown()).isTrue(); | |
| assertThat(lds1Filter2.isShutdown()).isTrue(); | |
| assertThat(lds1Filter1.isShutdown()).isFalse(); | |
| assertThat(lds1Filter2.isShutdown()).isFalse(); | |
| } | |
| // LDS 2: resource not found. | |
| reset(mockListener); | |
| when(mockListener.onResult2(any())).thenReturn(Status.OK); | |
| xdsClient.deliverLdsResourceNotFound(); // This now triggers onResourceChanged(StatusOr.fromStatus(NOT_FOUND)) | |
| assertEmptyResolutionResult(expectedLdsResourceName); | |
| // Verify shutdown. | |
| assertThat(lds1Filter1.isShutdown()).isTrue(); | |
| assertThat(lds1Filter2.isShutdown()).isTrue(); | |
| } |
| // RDS 2: RDS_RESOURCE_NAME not found. | ||
| reset(mockListener); | ||
| when(mockListener.onResult2(any())).thenReturn(Status.OK); | ||
| xdsClient.deliverRdsResourceNotFound(RDS_RESOURCE_NAME); | ||
| assertEmptyResolutionResult(RDS_RESOURCE_NAME); | ||
| assertThat(lds1Filter1.isShutdown()).isTrue(); | ||
| assertThat(lds1Filter2.isShutdown()).isTrue(); | ||
| xdsClient.deliverAmbientError(RDS_RESOURCE_NAME, Status.NOT_FOUND); | ||
| verify(mockListener, never()).onResult2(any()); | ||
| assertThat(lds1Filter1.isShutdown()).isFalse(); | ||
| assertThat(lds1Filter2.isShutdown()).isFalse(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gemini-code-assist just like previous comment, help me write both tests filterState_shutdown_onRdsNotFound and filterState_noShutdown_onRdsAmbientError
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Certainly! You're right, having both tests will improve clarity and coverage.
First, please rename the current test method filterState_shutdown_onRdsNotFound (starting at line 1617) to filterState_noShutdown_onRdsAmbientError. The existing code within this method is already correctly testing the scenario where filters are not shut down on an ambient error.
Next, here is the code for the new filterState_shutdown_onRdsNotFound test method. This test verifies that filters are shut down when an RDS resource is definitively not found. You can add this method after the filterState_noShutdown_onRdsAmbientError method.
@Test
public void filterState_shutdown_onRdsNotFound() {
StatefulFilter.Provider filterProvider = filterStateTestSetupResolver();
StatefulFilter lds1Filter1 = filterProvider.create("filter1");
StatefulFilter lds1Filter2 = filterProvider.create("filter2");
// Initial setup to get filters active
xdsClient.deliverLdsUpdateWithFilters(
VirtualHost.create("virtual-host", Collections.singletonList("domain"),
Collections.singletonList(Route.create(
RouteMatch.create(null, null, null),
RouteAction.newBuilder().setCluster(RDS_RESOURCE_NAME).build()))),
ImmutableList.of(
new NamedFilterConfig("filter1", lds1Filter1),
new NamedFilterConfig("filter2", lds1Filter2)));
xdsClient.deliverRdsUpdate(RDS_RESOURCE_NAME,
Collections.singletonList(VirtualHost.create("virtual-host",
Collections.singletonList("domain"),
Collections.singletonList(Route.create(
RouteMatch.create(null, null, null),
RouteAction.newBuilder().setCluster("cluster-name").build())))));
assertResolutionResult(expectedLdsResourceName, "cluster-name");
assertThat(lds1Filter1.isShutdown()).isFalse();
assertThat(lds1Filter2.isShutdown()).isFalse();
// RDS 2: RDS_RESOURCE_NAME not found (definitive error).
reset(mockListener);
when(mockListener.onResult2(any())).thenReturn(Status.OK);
xdsClient.deliverRdsResourceNotFound(RDS_RESOURCE_NAME);
assertEmptyResolutionResult(RDS_RESOURCE_NAME);
assertThat(lds1Filter1.isShutdown()).isTrue();
assertThat(lds1Filter2.isShutdown()).isTrue();
}|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request correctly implements the new watcher API from gRFC A88, which is a significant and positive change for xDS error handling. The changes are consistently applied across the codebase, and the refactoring in areas like XdsClientImpl.handleStreamClosed greatly improves code clarity. The test suite has also been thoroughly updated to reflect the new API and its behavior. I have one minor suggestion to improve efficiency in XdsServerWrapper.java.
| List<FilterChain> allFilterChains = new ArrayList<>(filterChains); | ||
| if (defaultFilterChain != null) { | ||
| allFilterChains = new ArrayList<>(filterChains); | ||
| allFilterChains.add(defaultFilterChain); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is correct, but it introduces a small performance regression by always creating a new ArrayList, even when defaultFilterChain is null. The previous implementation was more efficient as it only created a new list when necessary. Consider reverting to a pattern similar to the original implementation to avoid the unnecessary allocation, for example:
List<FilterChain> allFilterChains = filterChains;
if (defaultFilterChain != null) {
allFilterChains = new ArrayList<>(filterChains);
allFilterChains.add(defaultFilterChain);
}
Implements XdsClient watcher API changes of gRFC A88- https://github.com/grpc/proposal/blob/master/A88-xds-data-error-handling.md
Contents of the specific implemented part of grfc A88:
Changes to XdsClient Watcher APIs
The current XdsClient API has three methods:
OnResourceChanged(ResourceType resource): Invoked whenever a new version of the resource is received from the xDS server.
OnError(Status status): Invoked for all transient errors and for some data errors (NACKs). The watcher is generally expected to ignore the error if it already has a valid cached resource.
OnResourceDoesNotExist(): Invoked specifically for the does-not-exist case. The watcher is generally expected to stop using any previously cached resource and put itself into a failing state.
Note: The mechanism described in gRFC A53 inhibits these notifications for only the case where we already had a cached version of the resource, which in the SotW protocol applies only to LDS and CDS. That mechanism does not affect the case where we do not already have a cached version of the resource, which is where the does-not-exist timer is used.
These methods do not map well to the desired error handling behavior defined above.
This proposal replaces those methods with the following two methods:
OnResourceChanged(StatusOr resource): Will be invoked to notify the watcher of a new version of the resource received from the xDS server or an error indicating the reason why the resource cannot be obtained. Note that if an error is passed to this method after the watcher has previously seen a valid resource, the watcher is expected to stop using that previously delivered resource. In this case, the XdsClient will remove the resource from its cache, so that CSDS (see gRFC A40) and XdsClient metrics (see gRFC A78) will not reflect a resource that the client is not actually using.
OnAmbientError(Status status): Will be invoked to notify the watcher of an error that occurs after a resource has been received that should not modify the watcher's use of that resource but that may be useful information about the ambient state of the XdsClient. In particular, the watcher should not stop using the previously seen resource, and the XdsClient will not remove the resource from its cache. However, the error message may be useful as additional context to include in errors that are being generated for other reasons. For example, if failing a data plane RPC due to all endpoints being unreachable, it may be useful to also report that the client has lost contact with the xDS server. The error condition is considered to be cleared when either OnResourceChanged() is invoked or OnAmbientError() is invoked again with an OK status.